Fix DebeziumIO resuming from worker restart#37689
Fix DebeziumIO resuming from worker restart#37689Abacn wants to merge 1 commit intoapache:masterfrom
Conversation
Summary of ChangesHello @Abacn, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly enhances the robustness and configurability of DebeziumIO within Apache Beam. The primary goal was to prevent pipeline crash loops during worker restarts and ensure seamless data consumption. This was achieved by correctly initializing internal state, refining the record polling mechanism, and exposing a new configuration option for polling timeouts. These changes collectively improve the stability and operational flexibility of Debezium-based data pipelines. Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Changelog
Activity
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
f86b8df to
9b33b7f
Compare
|
Assigning reviewers: R: @ahmedabu98 for label java. Note: If you would like to opt out of this review, comment Available commands:
The PR bot will only process comments in the main thread (not review comments). |
* Move startTime recording into setup to fix NPE in restarted worker * Fix DebeziumIO poll loop not exiting when record list isn't empty * Make pollTimeout configurable
9b33b7f to
8b12359
Compare
| } | ||
| pollTimer.reset().start(); | ||
| task.commit(); | ||
| records = task.poll(); |
There was a problem hiding this comment.
This task.poll() could take much longer than pollingTimeOut, if we have a way to set timeout for task.poll() operation which respects the total pollingTimeOut, that would be better!
Part of #28248
Resolves crash loop when worker restart (same pipeline).
Move startTime recording into setup to fix NPE in restarted worker
Fix DebeziumIO poll loop not exiting when record list isn't empty - same issue for KafkaIO reported in [KafkaIO] Exit the poll loop in ReadFromKafkaDoFn after consumerPollingTimeout has elapsed #36029
Make pollTimeout configurable
Please add a meaningful description for your change here
Tested on Dataflow runner with following error injection procedure:
After pipeline started consuming data, ssh into worker machine and kill java process
logs seen:
confirmed worker does not read from the beginning again. Then add some rows, see it is consumed:
While this has fixed Dataflow runner worker crash / restart, I think there is another FR in #28248 to support start consumer from a specific offset. Keep the issue open for now.
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>instead.CHANGES.mdwith noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.